RabbitMQ系列(九)RabbitMQ进阶 |
您所在的位置:网站首页 › rabbitmq队列长度查询 python › RabbitMQ系列(九)RabbitMQ进阶 |
RabbitMQ进阶-Queue队列参数详解
文章目录
RabbitMQ进阶-Queue队列参数详解1.创建队列参数2.参数解析2.1 Message TTL2.2 Auto expire2.3 Max length2.4 Max length bytes2.5 Overflow behaviour2.6 Dead letter exchange2.7 Dead letter routing key2.8 Maximum priority2.9 Lazy mode2.10 Master locator
1.创建队列参数
我们看下队列参数 void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) throws IOException; queue 队列名称durable 队列是否持久化,false:队列在内存中,服务器挂掉后,队列就没了;true:服务器重启后,队列将会重新生成.注意:只是队列持久化,不代表队列中的消息持久化exclusive 队列是否专属,专属的范围针对的是连接,也就是说,一个连接下面的多个信道是可见的.对于其他连接是不可见的.连接断开后,该队列会被删除.注意,不是信道断开,是连接断开.并且,就算设置成了持久化,也会删除autoDelete 如果所有消费者都断开连接了,是否自动删除.如果还没有消费者从该队列获取过消息或者监听该队列,那么该队列不会删除.只有在有消费者从该队列获取过消息后,该队列才有可能自动删除(当所有消费者都断开连接,不管消息是否获取完)arguments ,args参数我们可以通过界面看下,一共下面这么多参数![]() 下面我们一 一来验证下属性,下面的队列中的RoutingKey 很多我都直接用队列名做RoutingKey了,事先声明下 2.参数解析 2.1 Message TTL我们新建一个TTL的队列, 设置x-message-ttl:10000,即10s,发布一条消息,看这个消息如果超过生存周期,如何处理 队列多长时间(毫秒)没有被使用(访问)就会被删除.换个说法就是,当队列在指定的时间内没有被使用(访问)就会被删除. 新建一个队列,持久化,然后设置 周期20s,设置x-expires:20000
队列可以容纳的消息的最大条数,超过这个条数,队列头部的消息将会被丢弃,注意是队列头部的消息被丢弃 现在我们创建队列容纳5个消息 生产者生产5条消息 package queue.params; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import conn.MqConnectUtil; import java.time.LocalDate; import java.time.LocalTime; import java.util.HashMap; import java.util.Map; /** * 当前描述:生产者 * * @author: jiazijie * @since: 2020/6/10 下午11:14 */ public class MaxLengthProducer { /** * 队列名字 */ private final static String MAX_LENGTH_QUEUE_NAME = "max_length_queue"; public static void maxLengthProducer(Integer count) throws Exception { // 获取到连接以及mq通道 Connection connection = MqConnectUtil.getConnectionDefault(); // 从连接中创建通道 Channel channel = connection.createChannel(); /* 声明(创建)队列 queueDeclare( String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) * queue - 队列名 * durable - 是否是持久化队列, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失 * exclusie - 是否排外的,仅限于当前队列使用 * autoDelete - 是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过界面 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除 * arguments - 队列携带的参数 比如 ttl-生命周期,x-dead-letter 死信队列等等 */ Map arguments = new HashMap(); arguments.put("x-max-length", 5); channel.queueDeclare(MAX_LENGTH_QUEUE_NAME, false, false, false, arguments); /* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body * exchange - 交换机 ,"" 空时候指定的是 获取的virtualHost 虚拟服务器的 默认的exchang,每个virtualHost都有一个AMQP default type:direct 直接转发 * queuename - 队列信息 * props - 参数信息 * message 消息体 byte[]类型 */ for (int i = 0; i < count; i++) { // 消息内容 String message = "i=" + i + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); channel.basicPublish("", MAX_LENGTH_QUEUE_NAME, null, message.getBytes()); System.out.println(" **** Producer Sent Message: [" + message + "]"); } //关闭通道和连接 channel.close(); connection.close(); } public static void main(String[] args) throws Exception { //生产 5条消息 maxLengthProducer(5); //生产 10条消息 // maxLengthProducer(10); } }5条消息 都在队列中 然后再修改代码生产10条消息,可以看到消息清零后,又产生5条,队列中也只有5条 看下是什么消息,可以看到消息ID为 6,7,8,9,10,已经将前5条消息丢弃掉了,只保留了最后的5条 队列可以容纳的消息的最大字节数,超过这个字节数,队列头部的消息将会被丢弃. 我们新建一个Max length bytes 字节为3 的队列,为什么是3,因为一个汉字UTF-8编码是3个字节,所以我们输入看是否内容会被截断 队列创建 package queue.params; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import conn.MqConnectUtil; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; /** * 当前描述:生产者 * * @author: jiazijie * @since: 2020/6/10 下午11:14 */ public class MaxLengthBytesProducer { /** * 队列名字 */ private final static String MAX_LENGTH__BYTES_QUEUE_NAME = "max_length_bytes_queue"; public static void maxLengthBytesProducer() throws Exception { // 获取到连接以及mq通道 Connection connection = MqConnectUtil.getConnectionDefault(); // 从连接中创建通道 Channel channel = connection.createChannel(); /* 声明(创建)队列 queueDeclare( String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) * queue - 队列名 * durable - 是否是持久化队列, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失 * exclusie - 是否排外的,仅限于当前队列使用 * autoDelete - 是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过界面 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除 * arguments - 队列携带的参数 比如 ttl-生命周期,x-dead-letter 死信队列等等 */ Map arguments = new HashMap(); arguments.put("x-max-length-bytes", 3); channel.queueDeclare(MAX_LENGTH__BYTES_QUEUE_NAME, false, false, false, arguments); /* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body * exchange - 交换机 ,"" 空时候指定的是 获取的virtualHost 虚拟服务器的 默认的exchang,每个virtualHost都有一个AMQP default type:direct 直接转发 * queuename - 队列信息 * props - 参数信息 * message 消息体 byte[]类型 */ // 消息内容 String message = new String("测试一下".getBytes(), StandardCharsets.UTF_8); channel.basicPublish("", MAX_LENGTH__BYTES_QUEUE_NAME, null, message.getBytes()); System.out.println(" **** Producer Sent Message: [" + message + "]"); //关闭通道和连接 channel.close(); connection.close(); } public static void main(String[] args) throws Exception { //生产 5条消息 maxLengthBytesProducer(); } }然后启动生产者,生产消息,看队列中是否
官方 : Sets the queue overflow behaviour. This determines what happens to messages when the maximum length of a queue is reached. Valid values are drop-head or reject-publish. 队列中的消息溢出时,如何处理这些消息.要么丢弃队列头部的消息,要么拒绝接收后面生产者发送过来的所有消息.( 从上面两个参数的测试中可以看出,“drop-head” 应该是默认行为) 我们新建一个队列,看下队列溢出时候,如何处理消息,我们先设置队列长度为2,然后生产3条消息,看下默认处理 package queue.params; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import conn.MqConnectUtil; import java.time.LocalDate; import java.time.LocalTime; import java.util.HashMap; import java.util.Map; /** * 当前描述:生产者 * * @author: jiazijie * @since: 2020/6/10 下午11:14 */ public class OverflowDropProducer { /** * 队列名字 */ private final static String OVER_DROP_QUEUE_NAME = "over_drop_queue"; public static void overflowDropProducer(Integer count) throws Exception { // 获取到连接以及mq通道 Connection connection = MqConnectUtil.getConnectionDefault(); // 从连接中创建通道 Channel channel = connection.createChannel(); /* 声明(创建)队列 queueDeclare( String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) * queue - 队列名 * durable - 是否是持久化队列, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失 * exclusie - 是否排外的,仅限于当前队列使用 * autoDelete - 是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过界面 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除 * arguments - 队列携带的参数 比如 ttl-生命周期,x-dead-letter 死信队列等等 */ Map arguments = new HashMap(); arguments.put("x-max-length", 2); // arguments.put("x-overflow", "drop-head"); //不设置试试,消息如何处理,看下默认情况 channel.queueDeclare(OVER_DROP_QUEUE_NAME, false, false, false, arguments); /* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body * exchange - 交换机 ,"" 空时候指定的是 获取的virtualHost 虚拟服务器的 默认的exchang,每个virtualHost都有一个AMQP default type:direct 直接转发 * queuename - 队列信息 * props - 参数信息 * message 消息体 byte[]类型 */ for (int i = 0; i < count; i++) { // 消息内容 String message = "i=" + i + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); channel.basicPublish("", OVER_DROP_QUEUE_NAME, null, message.getBytes()); System.out.println(" **** Producer Sent Message: [" + message + "]"); } //关闭通道和连接 channel.close(); connection.close(); } public static void main(String[] args) throws Exception { //生产 10条消息 overflowDropProducer(3); } }生产3条消息 因为rabbitmq不允许对已有队列的参数进行修改,我们现在我们删除队列 然后重新运行生产者,设置下参数 arguments.put(“x-overflow”, “drop-head”); 看看是不是一样的效果,可以验证,和默认是一样的 现在我们设置一下参数 arguments.put(“x-overflow”, “reject-publish”); 看下有什么不一样,依旧删除队列,修改代码 Map arguments = new HashMap(); arguments.put("x-max-length", 2); arguments.put("x-overflow", "reject-publish"); //不设置试试,消息如何处理,看下默认情况 channel.queueDeclare(OVER_DROP_QUEUE_NAME, false, false, false, arguments);依旧生产3条消息 下一章专门讲 死信队列和死信队列路由 2.7 Dead letter routing key下一章专门讲 死信队列和死信队列路由 2.8 Maximum priority设置该队列中的消息的优先级最大值.发布消息的时候,可以指定消息的优先级,优先级高的先被消费.如果没有设置该参数,那么该队列不支持消息优先级功能.即使发布消息的时候传入了优先级的值,也不会起什么作用 设置队列arguments.put(“x-max-priority”, 100); 我们发送6条消息,分别设置标识id为 1, 50, 255,200,255-1,256 看下6条优先级消息的处理情况 分析: 为什么要发6条消息,x-max-priority 最大优先级范围在 0~255之间,所以我们设置了 两个255的消息,id分别为255和255-1,到时候看下这两个消息的优先级,而且我们还设置了一个 256的id,看下这个的优先级,注意我是先放255,再放200,再放255-1的 package queue.params; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import conn.MqConnectUtil; import java.time.LocalDate; import java.time.LocalTime; import java.util.HashMap; import java.util.Map; /** * 当前描述:生产者 * * @author: jiazijie * @since: 2020/6/10 下午11:14 */ public class MaxPriorityProducer { /** * 队列名字 */ private final static String MAX_PRIORITY_QUEUE_NAME = "max_priority_queue"; public static void maxPriorityProducer() throws Exception { // 获取到连接以及mq通道 Connection connection = MqConnectUtil.getConnectionDefault(); // 从连接中创建通道 Channel channel = connection.createChannel(); /* 声明(创建)队列 queueDeclare( String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) * queue - 队列名 * durable - 是否是持久化队列, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失 * exclusie - 是否排外的,仅限于当前队列使用 * autoDelete - 是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过界面 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除 * arguments - 队列携带的参数 比如 ttl-生命周期,x-dead-letter 死信队列等等 */ Map arguments = new HashMap(); arguments.put("x-max-priority", 100); channel.queueDeclare(MAX_PRIORITY_QUEUE_NAME, false, false, false, arguments); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .priority(1) // 传送方式 .contentEncoding("UTF-8") // 编码方式 .build(); /* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body * exchange - 交换机 ,"" 空时候指定的是 获取的virtualHost 虚拟服务器的 默认的exchang,每个virtualHost都有一个AMQP default type:direct 直接转发 * queuename - 队列信息 * props - 参数信息 * message 消息体 byte[]类型 * * 设置优先级 为 1的消息 */ String message1 = "i=1" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); AMQP.BasicProperties prop1 = new AMQP.BasicProperties().builder() .priority(1) // 传送方式 .contentEncoding("UTF-8") // 编码方式 .build(); channel.basicPublish("", MAX_PRIORITY_QUEUE_NAME, prop1, message1.getBytes()); System.out.println(" **** Producer Sent Message: [" + message1 + "]"); //设置优先级为 50 的 消息 String message50 = "i=50" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); AMQP.BasicProperties prop50 = new AMQP.BasicProperties().builder() .priority(50) // 传送方式 .contentEncoding("UTF-8") // 编码方式 .build(); channel.basicPublish("", MAX_PRIORITY_QUEUE_NAME, prop50, message50.getBytes()); System.out.println(" **** Producer Sent Message: [" + message50 + "]"); //设置优先级为 255 的 消息 String message255 = "i=255" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); AMQP.BasicProperties prop255 = new AMQP.BasicProperties().builder() .priority(255) // 传送方式 .contentEncoding("UTF-8") // 编码方式 .build(); channel.basicPublish("", MAX_PRIORITY_QUEUE_NAME, prop255, message255.getBytes()); System.out.println(" **** Producer Sent Message: [" + message255 + "]"); //设置优先级为 200 的 消息 String message200 = "i=200" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); AMQP.BasicProperties prop200 = new AMQP.BasicProperties().builder() .priority(200) // 传送方式 .contentEncoding("UTF-8") // 编码方式 .build(); channel.basicPublish("", MAX_PRIORITY_QUEUE_NAME, prop200, message200.getBytes()); System.out.println(" **** Producer Sent Message: [" + message200 + "]"); //设置优先级为 255 的 消息 String message255_1 = "i=255-1" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); AMQP.BasicProperties prop255_1 = new AMQP.BasicProperties().builder() .priority(255) // 传送方式 .contentEncoding("UTF-8") // 编码方式 .build(); channel.basicPublish("", MAX_PRIORITY_QUEUE_NAME, prop255_1, message255_1.getBytes()); System.out.println(" **** Producer Sent Message: [" + message255_1 + "]"); //设置优先级为 256 的 消息 String message256 = "i=256" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); AMQP.BasicProperties prop256 = new AMQP.BasicProperties().builder() .priority(256) // 传送方式 .contentEncoding("UTF-8") // 编码方式 .build(); channel.basicPublish("", MAX_PRIORITY_QUEUE_NAME, prop256, message256.getBytes()); System.out.println(" **** Producer Sent Message: [" + message256 + "]"); //关闭通道和连接 channel.close(); connection.close(); } public static void main(String[] args) throws Exception { //生产 消息 maxPriorityProducer(); } }生产6条消息 看下队列中的消息 我们看下消息,获取消息时候优先消费谁呢?这个200、255、255-1、256的顺序如何定义 分析: 我们发送顺序是 1, 50, 255,200,255-1,256 x-max-priority=100 1. 大于max-priority 小于 256的优先级 超出队列定义100 且小于256的 这一部分 就是 255、200、255-1,他们三条消息是优先级平等的,都按照最大优先级设置、且消费顺序按照先进先出,255先消费、200第二消费、255-1最后消费 2. 大于255的,比如我们设置的256 大于255,队列优先级范围的,同意按照优先级 0来处理,所以256应该是最后消费的,优先级比 1还小 3. 在max-prority范围内的,也就是小于100的这一部分 按照正常优先级顺序,先消费大的,在消费小的 综上所述,我们的消费顺序应该是 255、200、255-1、50、1、256这个顺序,我们来验证一下吧,查看队列消息 懒人模式.该模式下的队列会先将交换机推送过来的消息(尽可能多的)保存在磁盘上,以减少内存的占用.当消费者开始消费的时候才加载到内存中;如果没有设置懒人模式,队列则会直接利用内存缓存,以最快的速度传递消息. 生产两个队列的消息,看下消息的区别 package queue.params; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import conn.MqConnectUtil; import java.time.LocalDate; import java.time.LocalTime; import java.util.HashMap; import java.util.Map; /** * 当前描述:生产者 * * @author: jiazijie * @since: 2020/6/10 下午11:14 */ public class LazyModeProducer { /** * 队列名字 */ private final static String LAZY_QUEUE_NAME = "lazy_mode_queue"; private final static String NORMAL_QUEUE_NAME = "normal_mode_queue"; public static void lazyModeProducer() throws Exception { // 获取到连接以及mq通道 Connection connection = MqConnectUtil.getConnectionDefault(); // 从连接中创建通道 Channel channel = connection.createChannel(); /* 声明(创建)队列 queueDeclare( String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) * queue - 队列名 * durable - 是否是持久化队列, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失 * exclusie - 是否排外的,仅限于当前队列使用 * autoDelete - 是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过界面 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除 * arguments - 队列携带的参数 比如 ttl-生命周期,x-dead-letter 死信队列等等 */ Map arguments = new HashMap(); arguments.put("x-queue-mode", "lazy"); channel.queueDeclare(LAZY_QUEUE_NAME, false, false, false, arguments); /* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body * exchange - 交换机 ,"" 空时候指定的是 获取的virtualHost 虚拟服务器的 默认的exchang,每个virtualHost都有一个AMQP default type:direct 直接转发 * queuename - 队列信息 * props - 参数信息 * message 消息体 byte[]类型 */ // 消息内容 String message = "i=1" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); channel.basicPublish("", LAZY_QUEUE_NAME, null, message.getBytes()); System.out.println(" **** Producer Sent Message: [" + message + "]"); //关闭通道和连接 channel.close(); connection.close(); } public static void normalModeProducer() throws Exception { // 获取到连接以及mq通道 Connection connection = MqConnectUtil.getConnectionDefault(); // 从连接中创建通道 Channel channel = connection.createChannel(); /* 声明(创建)队列 queueDeclare( String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) * queue - 队列名 * durable - 是否是持久化队列, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失 * exclusie - 是否排外的,仅限于当前队列使用 * autoDelete - 是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过界面 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除 * arguments - 队列携带的参数 比如 ttl-生命周期,x-dead-letter 死信队列等等 */ channel.queueDeclare(NORMAL_QUEUE_NAME, false, false, false, null); /* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body * exchange - 交换机 ,"" 空时候指定的是 获取的virtualHost 虚拟服务器的 默认的exchang,每个virtualHost都有一个AMQP default type:direct 直接转发 * queuename - 队列信息 * props - 参数信息 * message 消息体 byte[]类型 */ // 消息内容 String message = "i=2" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now(); channel.basicPublish("", NORMAL_QUEUE_NAME, null, message.getBytes()); System.out.println(" **** Producer Sent Message: [" + message + "]"); //关闭通道和连接 channel.close(); connection.close(); } public static void main(String[] args) throws Exception { //生产消息 lazyModeProducer(); normalModeProducer(); } }看下队列中的消息 这个是关于集群配置的,把队列设置为主位置模式,确定在节点集群上声明时队列主位置所依据的规则 下一章 RabbitMQ系列(十)RabbitMQ进阶-Queue队列参数详解-死信交换机 |
今日新闻 |
点击排行 |
|
推荐新闻 |
图片新闻 |
|
专题文章 |
CopyRight 2018-2019 实验室设备网 版权所有 win10的实时保护怎么永久关闭 |